package com.bw.gmall.realtime.dws.app;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.base.BaseApp;
import com.bw.gmall.realtime.common.bean.UserLoginBean;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.function.DorisMapFunction;
import com.bw.gmall.realtime.common.util.DateFormatUtil;
import com.bw.gmall.realtime.common.util.FlinkSinkUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class DwsUserUserLoginWindow extends BaseApp {
    public static void main(String[] args) {
        new DwsUserUserLoginWindow().start(Constant.TOPIC_DWD_TRAFFIC_PAGE,Constant.DWS_USER_USER_LOGIN_WINDOW,4,10024);
    }
    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
        // 1、ETL数据
        SingleOutputStreamOperator<JSONObject> etlStream = etl(dataStreamSource);
        // 2、根据UID分组

        SingleOutputStreamOperator<UserLoginBean> processSrteam = etlStream.keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject jsonObject) throws Exception {
                return jsonObject.getJSONObject("common").getString("uid");
            }
        }).process(new KeyedProcessFunction<String, JSONObject, UserLoginBean>() {
            private ValueState<String> loginDtState;

            @Override
            public void open(Configuration parameters) throws Exception {

                // 如果这个状态永不过期,就需要考虑OOM现象

                // 20220303 --- >
                loginDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>("login_dt", String.class));
            }

            @Override
            public void processElement(JSONObject jsonObject, KeyedProcessFunction<String, JSONObject, UserLoginBean>.Context context, Collector<UserLoginBean> collector) throws Exception {
                Long ts = jsonObject.getLong("ts");
                // 当前的登录日期
                String curDt = DateFormatUtil.tsToDate(ts);
                String value = loginDtState.value();
                // 回流
                long backCt = 0L;
                // 独立登录用户数
                long uuCt = 0L;
//                if (value == null || !value.equals(curDt)) {
//                    uuCt = 1L;
//                }
                if (!curDt.equals(value)) {
                    uuCt = 1L;
                }

                if (value != null) {
                    // 大于7天
                    if (ts - DateFormatUtil.dateToTs(value) > 7 * 24 * 60 * 60 * 1000) {
                        backCt = 1L;
                    }
                }
                loginDtState.update(curDt);

                if (backCt != 0 || uuCt != 0) {
                    collector.collect(UserLoginBean.builder()
                            .backCt(backCt)
                            .uuCt(uuCt)
                            .ts(ts)
                            .build());
                }
            }
        });
        // 3.开窗聚合
        SingleOutputStreamOperator<UserLoginBean> reduceStream = processSrteam.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<UserLoginBean>() {
            @Override
            public UserLoginBean reduce(UserLoginBean t1, UserLoginBean t2) throws Exception {
                t1.setBackCt(t1.getBackCt() + t1.getBackCt());
                t2.setUuCt(t2.getUuCt() + t2.getUuCt());
                return t1;
            }
        }, new AllWindowFunction<UserLoginBean, UserLoginBean, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<UserLoginBean> iterable, Collector<UserLoginBean> collector) throws Exception {
                long start = timeWindow.getStart();
                long end = timeWindow.getEnd();
                UserLoginBean userLoginBean = iterable.iterator().next();
                userLoginBean.setStt(DateFormatUtil.tsToDateTime(start));
                userLoginBean.setEdt(DateFormatUtil.tsToDateTime(end));
                userLoginBean.setCurDate(DateFormatUtil.tsToDate(System.currentTimeMillis()));
                collector.collect(userLoginBean);
            }
        });
        // 4、写入Doris
        reduceStream.map(new DorisMapFunction<>()).sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_LOGIN_WINDOW));


    }

    private SingleOutputStreamOperator<JSONObject> etl(DataStreamSource<String> kafkaSource) {
        return kafkaSource.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(s);
                    JSONObject common = jsonObject.getJSONObject("common");
                    JSONObject page = jsonObject.getJSONObject("page");
                    String uid = common.getString("uid");
                    String ts = jsonObject.getString("ts");
                    String lastPageId = page.getString("last_page_id");

                    // uid!=null and last_pageId = null  自动登录
                    // uid != null and last_pageId.equals("login") 手动登录
                    if (uid != null && ts != null) {
                        if (lastPageId == null || "login".equals(lastPageId)) {
                            collector.collect(jsonObject);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
